{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "view-in-github"
   },
   "source": [
    "<a href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/documentation/transforms/python/elementwise/partition-py.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open in Colab\"/></a>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "view-the-docs-top"
   },
   "source": [
    "<table align=\"left\"><td><a target=\"_blank\" href=\"https://beam.apache.org/documentation/transforms/python/elementwise/partition\"><img src=\"https://beam.apache.org/images/logos/full-color/name-bottom/beam-logo-full-color-name-bottom-100.png\" width=\"32\" height=\"32\" />View the docs</a></td></table>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "cellView": "form",
    "id": "_-code"
   },
   "outputs": [],
   "source": [
    "#@title Licensed under the Apache License, Version 2.0 (the \"License\")\n",
    "# Licensed to the Apache Software Foundation (ASF) under one\n",
    "# or more contributor license agreements. See the NOTICE file\n",
    "# distributed with this work for additional information\n",
    "# regarding copyright ownership. The ASF licenses this file\n",
    "# to you under the Apache License, Version 2.0 (the\n",
    "# \"License\"); you may not use this file except in compliance\n",
    "# with the License. You may obtain a copy of the License at\n",
    "#\n",
    "#   http://www.apache.org/licenses/LICENSE-2.0\n",
    "#\n",
    "# Unless required by applicable law or agreed to in writing,\n",
    "# software distributed under the License is distributed on an\n",
    "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
    "# KIND, either express or implied. See the License for the\n",
    "# specific language governing permissions and limitations\n",
    "# under the License."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "partition"
   },
   "source": [
    "# Partition\n",
    "\n",
    "<script type=\"text/javascript\">\n",
    "localStorage.setItem('language', 'language-py')\n",
    "</script>\n",
    "\n",
    "<table align=\"left\" style=\"margin-right:1em\">\n",
    "  <td>\n",
    "    <a class=\"button\" target=\"_blank\" href=\"https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.Partition\"><img src=\"https://beam.apache.org/images/logos/sdks/python.png\" width=\"32px\" height=\"32px\" alt=\"Pydoc\"/> Pydoc</a>\n",
    "  </td>\n",
    "</table>\n",
    "\n",
    "<br/><br/><br/>\n",
    "\n",
    "Separates elements in a collection into multiple output\n",
    "collections. The partitioning function contains the logic that determines how\n",
    "to separate the elements of the input collection into each resulting\n",
    "partition output collection.\n",
    "\n",
    "The number of partitions must be determined at graph construction time.\n",
    "You cannot determine the number of partitions in mid-pipeline\n",
    "\n",
    "See more information in the [Beam Programming Guide](https://beam.apache.org/documentation/programming-guide/#partition)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "setup"
   },
   "source": [
    "## Setup\n",
    "\n",
    "To run a code cell, you can click the **Run cell** button at the top left of the cell,\n",
    "or select it and press **`Shift+Enter`**.\n",
    "Try modifying a code cell and re-running it to see what happens.\n",
    "\n",
    "> To learn more about Colab, see\n",
    "> [Welcome to Colaboratory!](https://colab.sandbox.google.com/notebooks/welcome.ipynb).\n",
    "\n",
    "First, let's install the `apache-beam` module."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "id": "setup-code"
   },
   "outputs": [],
   "source": [
    "!pip install --quiet -U apache-beam"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "examples"
   },
   "source": [
    "## Examples\n",
    "\n",
    "In the following examples, we create a pipeline with a `PCollection` of produce with their icon, name, and duration.\n",
    "Then, we apply `Partition` in multiple ways to split the `PCollection` into multiple `PCollections`.\n",
    "\n",
    "`Partition` accepts a function that receives the number of partitions,\n",
    "and returns the index of the desired partition for the element.\n",
    "The number of partitions passed must be a positive integer,\n",
    "and it must return an integer in the range `0` to `num_partitions-1`."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "example-1-partition-with-a-function"
   },
   "source": [
    "### Example 1: Partition with a function\n",
    "\n",
    "In the following example, we have a known list of durations.\n",
    "We partition the `PCollection` into one `PCollection` for every duration type."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "id": "example-1-partition-with-a-function-code"
   },
   "outputs": [],
   "source": [
    "import apache_beam as beam\n",
    "\n",
    "durations = ['annual', 'biennial', 'perennial']\n",
    "\n",
    "def by_duration(plant, num_partitions):\n",
    "  return durations.index(plant['duration'])\n",
    "\n",
    "with beam.Pipeline() as pipeline:\n",
    "  annuals, biennials, perennials = (\n",
    "      pipeline\n",
    "      | 'Gardening plants' >> beam.Create([\n",
    "          {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},\n",
    "          {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},\n",
    "          {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},\n",
    "          {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},\n",
    "          {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},\n",
    "      ])\n",
    "      | 'Partition' >> beam.Partition(by_duration, len(durations))\n",
    "  )\n",
    "\n",
    "  annuals | 'Annuals' >> beam.Map(lambda x: print('annual: {}'.format(x)))\n",
    "  biennials | 'Biennials' >> beam.Map(\n",
    "      lambda x: print('biennial: {}'.format(x)))\n",
    "  perennials | 'Perennials' >> beam.Map(\n",
    "      lambda x: print('perennial: {}'.format(x)))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "example-1-partition-with-a-function-2"
   },
   "source": [
    "<table align=\"left\" style=\"margin-right:1em\">\n",
    "  <td>\n",
    "    <a class=\"button\" target=\"_blank\" href=\"https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/elementwise/partition.py\"><img src=\"https://www.tensorflow.org/images/GitHub-Mark-32px.png\" width=\"32px\" height=\"32px\" alt=\"View source code\"/> View source code</a>\n",
    "  </td>\n",
    "</table>\n",
    "\n",
    "<br/><br/><br/>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "example-2-partition-with-a-lambda-function"
   },
   "source": [
    "### Example 2: Partition with a lambda function\n",
    "\n",
    "We can also use lambda functions to simplify **Example 1**."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "id": "example-2-partition-with-a-lambda-function-code"
   },
   "outputs": [],
   "source": [
    "import apache_beam as beam\n",
    "\n",
    "durations = ['annual', 'biennial', 'perennial']\n",
    "\n",
    "with beam.Pipeline() as pipeline:\n",
    "  annuals, biennials, perennials = (\n",
    "      pipeline\n",
    "      | 'Gardening plants' >> beam.Create([\n",
    "          {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},\n",
    "          {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},\n",
    "          {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},\n",
    "          {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},\n",
    "          {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},\n",
    "      ])\n",
    "      | 'Partition' >> beam.Partition(\n",
    "          lambda plant, num_partitions: durations.index(plant['duration']),\n",
    "          len(durations),\n",
    "      )\n",
    "  )\n",
    "\n",
    "  annuals | 'Annuals' >> beam.Map(lambda x: print('annual: {}'.format(x)))\n",
    "  biennials | 'Biennials' >> beam.Map(\n",
    "      lambda x: print('biennial: {}'.format(x)))\n",
    "  perennials | 'Perennials' >> beam.Map(\n",
    "      lambda x: print('perennial: {}'.format(x)))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "example-2-partition-with-a-lambda-function-2"
   },
   "source": [
    "<table align=\"left\" style=\"margin-right:1em\">\n",
    "  <td>\n",
    "    <a class=\"button\" target=\"_blank\" href=\"https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/elementwise/partition.py\"><img src=\"https://www.tensorflow.org/images/GitHub-Mark-32px.png\" width=\"32px\" height=\"32px\" alt=\"View source code\"/> View source code</a>\n",
    "  </td>\n",
    "</table>\n",
    "\n",
    "<br/><br/><br/>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "example-3-partition-with-multiple-arguments"
   },
   "source": [
    "### Example 3: Partition with multiple arguments\n",
    "\n",
    "You can pass functions with multiple arguments to `Partition`.\n",
    "They are passed as additional positional arguments or keyword arguments to the function.\n",
    "\n",
    "In machine learning, it is a common task to split data into\n",
    "[training and a testing datasets](https://en.wikipedia.org/wiki/Training,_validation,_and_test_sets).\n",
    "Typically, 80% of the data is used for training a model and 20% is used for testing.\n",
    "\n",
    "In this example, we split a `PCollection` dataset into training and testing datasets.\n",
    "We define `split_dataset`, which takes the `plant` element, `num_partitions`,\n",
    "and an additional argument `ratio`.\n",
    "The `ratio` is a list of numbers which represents the ratio of how many items will go into each partition.\n",
    "`num_partitions` is used by `Partitions` as a positional argument,\n",
    "while `plant` and `ratio` are passed to `split_dataset`.\n",
    "\n",
    "If we want an 80%/20% split, we can specify a ratio of `[8, 2]`, which means that for every 10 elements,\n",
    "8 go into the first partition and 2 go into the second.\n",
    "In order to determine which partition to send each element, we have different buckets.\n",
    "For our case `[8, 2]` has **10** buckets,\n",
    "where the first 8 buckets represent the first partition and the last 2 buckets represent the second partition.\n",
    "\n",
    "First, we check that the ratio list's length corresponds to the `num_partitions` we pass.\n",
    "We then get a bucket index for each element, in the range from 0 to 9 (`num_buckets-1`).\n",
    "We could do `hash(element) % len(ratio)`, but instead we sum all the ASCII characters of the\n",
    "JSON representation to make it deterministic.\n",
    "Finally, we loop through all the elements in the ratio and have a running total to\n",
    "identify the partition index to which that bucket corresponds.\n",
    "\n",
    "This `split_dataset` function is generic enough to support any number of partitions by any ratio.\n",
    "You might want to adapt the bucket assignment to use a more appropriate or randomized hash for your dataset."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "id": "example-3-partition-with-multiple-arguments-code"
   },
   "outputs": [],
   "source": [
    "import apache_beam as beam\n",
    "import json\n",
    "\n",
    "def split_dataset(plant, num_partitions, ratio):\n",
    "  assert num_partitions == len(ratio)\n",
    "  bucket = sum(map(ord, json.dumps(plant))) % sum(ratio)\n",
    "  total = 0\n",
    "  for i, part in enumerate(ratio):\n",
    "    total += part\n",
    "    if bucket < total:\n",
    "      return i\n",
    "  return len(ratio) - 1\n",
    "\n",
    "with beam.Pipeline() as pipeline:\n",
    "  train_dataset, test_dataset = (\n",
    "      pipeline\n",
    "      | 'Gardening plants' >> beam.Create([\n",
    "          {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},\n",
    "          {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},\n",
    "          {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},\n",
    "          {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},\n",
    "          {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},\n",
    "      ])\n",
    "      | 'Partition' >> beam.Partition(split_dataset, 2, ratio=[8, 2])\n",
    "  )\n",
    "\n",
    "  train_dataset | 'Train' >> beam.Map(lambda x: print('train: {}'.format(x)))\n",
    "  test_dataset | 'Test' >> beam.Map(lambda x: print('test: {}'.format(x)))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "example-3-partition-with-multiple-arguments-2"
   },
   "source": [
    "<table align=\"left\" style=\"margin-right:1em\">\n",
    "  <td>\n",
    "    <a class=\"button\" target=\"_blank\" href=\"https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/elementwise/partition.py\"><img src=\"https://www.tensorflow.org/images/GitHub-Mark-32px.png\" width=\"32px\" height=\"32px\" alt=\"View source code\"/> View source code</a>\n",
    "  </td>\n",
    "</table>\n",
    "\n",
    "<br/><br/><br/>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "related-transforms"
   },
   "source": [
    "## Related transforms\n",
    "\n",
    "* [Filter](https://beam.apache.org/documentation/transforms/python/elementwise/filter) is useful if the function is just\n",
    "  deciding whether to output an element or not.\n",
    "* [ParDo](https://beam.apache.org/documentation/transforms/python/elementwise/pardo) is the most general elementwise mapping\n",
    "  operation, and includes other abilities such as multiple output collections and side-inputs.\n",
    "* [CoGroupByKey](https://beam.apache.org/documentation/transforms/python/aggregation/cogroupbykey)\n",
    "performs a per-key equijoin.\n",
    "\n",
    "<table align=\"left\" style=\"margin-right:1em\">\n",
    "  <td>\n",
    "    <a class=\"button\" target=\"_blank\" href=\"https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.Partition\"><img src=\"https://beam.apache.org/images/logos/sdks/python.png\" width=\"32px\" height=\"32px\" alt=\"Pydoc\"/> Pydoc</a>\n",
    "  </td>\n",
    "</table>\n",
    "\n",
    "<br/><br/><br/>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "view-the-docs-bottom"
   },
   "source": [
    "<table align=\"left\"><td><a target=\"_blank\" href=\"https://beam.apache.org/documentation/transforms/python/elementwise/partition\"><img src=\"https://beam.apache.org/images/logos/full-color/name-bottom/beam-logo-full-color-name-bottom-100.png\" width=\"32\" height=\"32\" />View the docs</a></td></table>"
   ]
  }
 ],
 "metadata": {
  "colab": {
   "name": "Partition - element-wise transform",
   "toc_visible": true
  },
  "kernelspec": {
   "display_name": "python3",
   "name": "python3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
